/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package cn.ac.iie.di.splitor;

import cn.ac.iie.di.global.GlobalParas;
import cn.ac.iie.di.tools.FileTools;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.apache.log4j.Logger;

/**
 *
 * @author Austin
 */
public class FileRead implements Runnable {

    public static Logger logger = Logger.getLogger(FileRead.class.getName());
    String filePath;
    ArrayBlockingQueue<String> dataQueue = null;
    ArrayBlockingQueue<String> sendQueue = null;
    AtomicInteger blockingQSize = new AtomicInteger(0);

    public FileRead(String filePath, ArrayBlockingQueue<String> dataQueue, ArrayBlockingQueue<String> sendQueue) {
        this.filePath = filePath;
        this.dataQueue = dataQueue;
        this.sendQueue = sendQueue;
    }

    public void run() {
        while (true) {
            try {
                String fileName = dataQueue.poll(500, TimeUnit.MILLISECONDS);
                if (fileName == null) {
                    continue;
                }
                logger.info("file:" + fileName);
//                        sigFile.getName().hashCode()%3
                readFileByLines(fileName, filePath);
            } catch (InterruptedException ex) {
                java.util.logging.Logger.getLogger(FileRead.class.getName()).log(Level.SEVERE, null, ex);
            }

        }

    }

    public void readFileByLines(String fileName, String newFileName) {

        BufferedReader reader = null;
        String filetype = null;
        try {

            long begin = System.currentTimeMillis();
            //System.out.println("以行为单位读取文件内容，一次读一整行：");
            //reader = new BufferedReader(new FileReader(fileName));
            reader = new BufferedReader(new InputStreamReader(new FileInputStream(fileName), "UTF-8"));
            String tempString = null;
            int line = 0;
            // 一次读入一行，直到读入null为文件结束  

            while ((tempString = reader.readLine()) != null) {
                if (blockingQSize.incrementAndGet() >= 1000) {
                    synchronized (blockingQSize) {
                        if (blockingQSize.get() >= 1000) {
                            blockingQSize.set(0);
                            logger.debug("ArrayBlockingQueue Size is " + sendQueue.remainingCapacity());
                        }
                    }
                }
//                try {
                while (!sendQueue.offer(tempString)) {
//                        Thread.sleep(1000);
//                        logger.warn("Put Json Array Block buffer Fail!");
                }

//                } catch (InterruptedException ex) {
//                }
                line++;
            }
            reader.close();

            logger.info("There is " + line + " in " + fileName + "！ ");
            //将文件移动走，删除.ok文件和解压文件，移动.gz和.sig文件

            FileTools.moveFileToNewPath(fileName, newFileName);
            long end = System.currentTimeMillis();
            logger.debug("consume " + (end - begin) + " ms");
        } catch (Exception e) {
            logger.error("the error is :" + e, e);
        }
    }
}
